1   /**
2    * Copyright 2014 Netflix, Inc.
3    * 
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    * 
8    * http://www.apache.org/licenses/LICENSE-2.0
9    * 
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  
17  package rx.internal.operators;
18  
19  import static org.mockito.Matchers.any;
20  import static org.mockito.Mockito.*;
21  
22  import java.util.Arrays;
23  
24  import org.junit.*;
25  
26  import rx.*;
27  import rx.exceptions.TestException;
28  import rx.functions.Func1;
29  import rx.internal.util.UtilityFunctions;
30  import rx.observers.TestSubscriber;
31  ;
32  
33  public class OperatorTakeUntilPredicateTest {
34      @Test
35      public void takeEmpty() {
36          @SuppressWarnings("unchecked")
37          Observer<Object> o = mock(Observer.class);
38          
39          Observable.empty().takeUntil(UtilityFunctions.alwaysTrue()).subscribe(o);
40          
41          verify(o, never()).onNext(any());
42          verify(o, never()).onError(any(Throwable.class));
43          verify(o).onCompleted();
44      }
45      @Test
46      public void takeAll() {
47          @SuppressWarnings("unchecked")
48          Observer<Object> o = mock(Observer.class);
49          
50          Observable.just(1, 2).takeUntil(UtilityFunctions.alwaysFalse()).subscribe(o);
51          
52          verify(o).onNext(1);
53          verify(o).onNext(2);
54          verify(o, never()).onError(any(Throwable.class));
55          verify(o).onCompleted();
56      }
57      @Test
58      public void takeFirst() {
59          @SuppressWarnings("unchecked")
60          Observer<Object> o = mock(Observer.class);
61          
62          Observable.just(1, 2).takeUntil(UtilityFunctions.alwaysTrue()).subscribe(o);
63          
64          verify(o).onNext(1);
65          verify(o, never()).onNext(2);
66          verify(o, never()).onError(any(Throwable.class));
67          verify(o).onCompleted();
68      }
69      @Test
70      public void takeSome() {
71          @SuppressWarnings("unchecked")
72          Observer<Object> o = mock(Observer.class);
73          
74          Observable.just(1, 2, 3).takeUntil(new Func1<Integer, Boolean>() {
75              @Override
76              public Boolean call(Integer t1) {
77                  return t1 == 2;
78              }
79          }).subscribe(o);
80          
81          verify(o).onNext(1);
82          verify(o).onNext(2);
83          verify(o, never()).onNext(3);
84          verify(o, never()).onError(any(Throwable.class));
85          verify(o).onCompleted();
86      }
87      @Test
88      public void functionThrows() {
89          @SuppressWarnings("unchecked")
90          Observer<Object> o = mock(Observer.class);
91          
92          Observable.just(1, 2, 3).takeUntil(new Func1<Integer, Boolean>() {
93              @Override
94              public Boolean call(Integer t1) {
95                  throw new TestException("Forced failure");
96              }
97          }).subscribe(o);
98          
99          verify(o).onNext(1);
100         verify(o, never()).onNext(2);
101         verify(o, never()).onNext(3);
102         verify(o).onError(any(TestException.class));
103         verify(o, never()).onCompleted();
104     }
105     @Test
106     public void sourceThrows() {
107         @SuppressWarnings("unchecked")
108         Observer<Object> o = mock(Observer.class);
109         
110         Observable.just(1)
111         .concatWith(Observable.<Integer>error(new TestException()))
112         .concatWith(Observable.just(2))
113         .takeUntil(UtilityFunctions.alwaysFalse()).subscribe(o);
114         
115         verify(o).onNext(1);
116         verify(o, never()).onNext(2);
117         verify(o).onError(any(TestException.class));
118         verify(o, never()).onCompleted();
119     }
120     @Test
121     public void backpressure() {
122         TestSubscriber<Integer> ts = new TestSubscriber<Integer>() {
123             @Override
124             public void onStart() {
125                 request(5);
126             }
127         };
128         
129         Observable.range(1, 1000).takeUntil(UtilityFunctions.alwaysFalse()).subscribe(ts);
130         
131         ts.assertNoErrors();
132         ts.assertReceivedOnNext(Arrays.asList(1, 2, 3, 4, 5));
133         Assert.assertEquals(0, ts.getOnCompletedEvents().size());
134     }
135 }